1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.channel.tcpsockethandler; 12 13 import collie.net; 14 import collie.channel.handler; 15 import collie.channel.handlercontext; 16 import kiss.net; 17 import kiss.exception; 18 import kiss.net.TcpStream; 19 import kiss.event.task; 20 21 final @trusted class TCPSocketHandler : HandlerAdapter!(const(ubyte[]), StreamWriteBuffer) 22 { 23 this(TcpStream sock) 24 { 25 restSocket(sock); 26 } 27 28 @property tcpSocket(){return _socket;} 29 30 void restSocket(TcpStream sock) 31 { 32 _socket = sock; 33 _loop = cast(EventLoop) sock.eventLoop(); 34 } 35 36 override void transportActive(Context ctx) 37 { 38 attachReadCallback(); 39 _socket.start(); 40 ctx.fireTransportActive(); 41 } 42 43 override void transportInactive(Context ctx) 44 { 45 if (_isAttch && _socket) { 46 _socket.close(); 47 } else { 48 ctx.fireTransportInactive(); 49 } 50 } 51 52 override void write(Context ctx, StreamWriteBuffer buffer, TheCallBack cback = null) 53 { 54 if(_loop.isInLoopThread()){ 55 _postWrite(buffer); 56 } else { 57 _loop.postTask(newTask(&_postWrite,buffer)); 58 } 59 60 } 61 62 override void close(Context ctx) 63 { 64 _loop.postTask(newTask(&_postClose)); 65 } 66 67 protected: 68 void attachReadCallback() 69 { 70 _isAttch = true; 71 _socket.onDataReceived(&readCallBack); 72 _socket.onClosed(&closeCallBack); 73 context.pipeline.transport(_socket); 74 } 75 76 void closeCallBack() nothrow 77 { 78 _isAttch = false; 79 catchAndLogException((){ 80 context.fireTransportInactive(); 81 context.pipeline.transport(null); 82 _socket.onDataReceived(null); 83 _socket.onClosed(null); 84 _socket = null; 85 context.pipeline.deletePipeline(); 86 }()); 87 } 88 89 void readCallBack(in ubyte[] buf) nothrow 90 { 91 catchAndLogException( 92 context.fireRead(buf) 93 ); 94 } 95 96 private: 97 final void _postClose(){ 98 if (_socket) 99 _socket.close(); 100 } 101 102 final void _postWrite(StreamWriteBuffer buffer) 103 { 104 if(_socket is null){ 105 buffer.doFinish(); 106 return; 107 } 108 if (context.pipeline.pipelineManager) 109 context.pipeline.pipelineManager.refreshTimeout(); 110 _socket.write(buffer); 111 } 112 private: 113 bool _isAttch = false; 114 TcpStream _socket; 115 EventLoop _loop; 116 }